package xin.marcher.wind.migrate.task;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import xin.marcher.wind.migrate.domain.RangeScroll;
import xin.marcher.wind.migrate.migrate.CheckDataProcessor;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 负责定时核对校验数据 核对所有已迁移的数据全量核对
 */
@Slf4j
//@Component
public class CheckDataTask {

    /**
     * 锁
     */
    private final Lock lock = new ReentrantLock();

    private Map<String, RangeScroll> queryEtlProgressMap = new HashMap<String, RangeScroll>();

    /**
     * 负责定时核对校验数据
     */
    @Scheduled(fixedDelay = 120000)
    public void task() {
        log.info("数据核对校验开始");
        if (lock.tryLock()) {
            try {
                CheckDataProcessor checkDataProcessor = CheckDataProcessor.getInstance();

                // 查询已同步完成的批次，未核对的数据进行核对处理
                List<RangeScroll> rangeScrollList = checkDataProcessor.queryCheckDataList();
                for (RangeScroll rangeScroll : rangeScrollList) {
                    String key = rangeScroll.getTableName() + rangeScroll.getTicket();
                    if (!queryEtlProgressMap.containsKey(key)) {
                        queryEtlProgressMap.put(key, rangeScroll);
                    }

                    checkDataProcessor.checkData(rangeScroll);
                }

                //更新核对类型的迁移记录
                checkDataProcessor.updateEtlProgressCheckSuccess(queryEtlProgressMap);
            } catch (Exception e) {
                log.error("数据核对过程中发生异常 {}", e.getMessage(), e);
            } finally {
                log.info("数据核对校验结束");
                lock.unlock();
                queryEtlProgressMap.clear();
            }
        }
    }
}
